1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx;
17
18 import static org.junit.Assert.assertEquals;
19 import static org.junit.Assert.assertNotNull;
20 import static org.junit.Assert.assertTrue;
21 import static org.junit.Assert.fail;
22 import static org.mockito.Matchers.any;
23 import static org.mockito.Matchers.anyInt;
24 import static org.mockito.Matchers.anyString;
25 import static org.mockito.Matchers.isA;
26 import static org.mockito.Mockito.inOrder;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.never;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.NoSuchElementException;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.concurrent.atomic.AtomicReference;
41
42 import org.junit.Before;
43 import org.junit.Test;
44 import org.mockito.InOrder;
45 import org.mockito.Mock;
46 import org.mockito.MockitoAnnotations;
47
48 import rx.Observable.OnSubscribe;
49 import rx.Observable.Transformer;
50 import rx.exceptions.OnErrorNotImplementedException;
51 import rx.functions.Action1;
52 import rx.functions.Action2;
53 import rx.functions.Func0;
54 import rx.functions.Func1;
55 import rx.functions.Func2;
56 import rx.observables.ConnectableObservable;
57 import rx.observers.TestSubscriber;
58 import rx.schedulers.TestScheduler;
59 import rx.subjects.ReplaySubject;
60 import rx.subjects.Subject;
61 import rx.subscriptions.BooleanSubscription;
62
63 public class ObservableTests {
64
65 @Mock
66 Observer<Integer> w;
67
68 private static final Func1<Integer, Boolean> IS_EVEN = new Func1<Integer, Boolean>() {
69 @Override
70 public Boolean call(Integer value) {
71 return value % 2 == 0;
72 }
73 };
74
75 @Before
76 public void before() {
77 MockitoAnnotations.initMocks(this);
78 }
79
80 @Test
81 public void fromArray() {
82 String[] items = new String[] { "one", "two", "three" };
83 assertEquals(new Integer(3), Observable.from(items).count().toBlocking().single());
84 assertEquals("two", Observable.from(items).skip(1).take(1).toBlocking().single());
85 assertEquals("three", Observable.from(items).takeLast(1).toBlocking().single());
86 }
87
88 @Test
89 public void fromIterable() {
90 ArrayList<String> items = new ArrayList<String>();
91 items.add("one");
92 items.add("two");
93 items.add("three");
94
95 assertEquals(new Integer(3), Observable.from(items).count().toBlocking().single());
96 assertEquals("two", Observable.from(items).skip(1).take(1).toBlocking().single());
97 assertEquals("three", Observable.from(items).takeLast(1).toBlocking().single());
98 }
99
100 @Test
101 public void fromArityArgs3() {
102 Observable<String> items = Observable.just("one", "two", "three");
103
104 assertEquals(new Integer(3), items.count().toBlocking().single());
105 assertEquals("two", items.skip(1).take(1).toBlocking().single());
106 assertEquals("three", items.takeLast(1).toBlocking().single());
107 }
108
109 @Test
110 public void fromArityArgs1() {
111 Observable<String> items = Observable.just("one");
112
113 assertEquals(new Integer(1), items.count().toBlocking().single());
114 assertEquals("one", items.takeLast(1).toBlocking().single());
115 }
116
117 @Test
118 public void testCreate() {
119
120 Observable<String> observable = Observable.create(new OnSubscribe<String>() {
121
122 @Override
123 public void call(Subscriber<? super String> Observer) {
124 Observer.onNext("one");
125 Observer.onNext("two");
126 Observer.onNext("three");
127 Observer.onCompleted();
128 }
129
130 });
131
132 @SuppressWarnings("unchecked")
133 Observer<String> observer = mock(Observer.class);
134 observable.subscribe(observer);
135 verify(observer, times(1)).onNext("one");
136 verify(observer, times(1)).onNext("two");
137 verify(observer, times(1)).onNext("three");
138 verify(observer, never()).onError(any(Throwable.class));
139 verify(observer, times(1)).onCompleted();
140 }
141
142 @Test
143 public void testCountAFewItems() {
144 Observable<String> observable = Observable.just("a", "b", "c", "d");
145 observable.count().subscribe(w);
146
147 verify(w, times(1)).onNext(anyInt());
148 verify(w).onNext(4);
149 verify(w, never()).onError(any(Throwable.class));
150 verify(w, times(1)).onCompleted();
151 }
152
153 @Test
154 public void testCountZeroItems() {
155 Observable<String> observable = Observable.empty();
156 observable.count().subscribe(w);
157
158 verify(w, times(1)).onNext(anyInt());
159 verify(w).onNext(0);
160 verify(w, never()).onError(any(Throwable.class));
161 verify(w, times(1)).onCompleted();
162 }
163
164 @Test
165 public void testCountError() {
166 Observable<String> o = Observable.create(new OnSubscribe<String>() {
167 @Override
168 public void call(Subscriber<? super String> obsv) {
169 obsv.onError(new RuntimeException());
170 }
171 });
172 o.count().subscribe(w);
173 verify(w, never()).onNext(anyInt());
174 verify(w, never()).onCompleted();
175 verify(w, times(1)).onError(any(RuntimeException.class));
176 }
177
178 public void testTakeFirstWithPredicateOfSome() {
179 Observable<Integer> observable = Observable.just(1, 3, 5, 4, 6, 3);
180 observable.takeFirst(IS_EVEN).subscribe(w);
181 verify(w, times(1)).onNext(anyInt());
182 verify(w).onNext(4);
183 verify(w, times(1)).onCompleted();
184 verify(w, never()).onError(any(Throwable.class));
185 }
186
187 @Test
188 public void testTakeFirstWithPredicateOfNoneMatchingThePredicate() {
189 Observable<Integer> observable = Observable.just(1, 3, 5, 7, 9, 7, 5, 3, 1);
190 observable.takeFirst(IS_EVEN).subscribe(w);
191 verify(w, never()).onNext(anyInt());
192 verify(w, times(1)).onCompleted();
193 verify(w, never()).onError(any(Throwable.class));
194 }
195
196 @Test
197 public void testTakeFirstOfSome() {
198 Observable<Integer> observable = Observable.just(1, 2, 3);
199 observable.take(1).subscribe(w);
200 verify(w, times(1)).onNext(anyInt());
201 verify(w).onNext(1);
202 verify(w, times(1)).onCompleted();
203 verify(w, never()).onError(any(Throwable.class));
204 }
205
206 @Test
207 public void testTakeFirstOfNone() {
208 Observable<Integer> observable = Observable.empty();
209 observable.take(1).subscribe(w);
210 verify(w, never()).onNext(anyInt());
211 verify(w, times(1)).onCompleted();
212 verify(w, never()).onError(any(Throwable.class));
213 }
214
215 @Test
216 public void testFirstOfNone() {
217 Observable<Integer> observable = Observable.empty();
218 observable.first().subscribe(w);
219 verify(w, never()).onNext(anyInt());
220 verify(w, never()).onCompleted();
221 verify(w, times(1)).onError(isA(NoSuchElementException.class));
222 }
223
224 @Test
225 public void testFirstWithPredicateOfNoneMatchingThePredicate() {
226 Observable<Integer> observable = Observable.just(1, 3, 5, 7, 9, 7, 5, 3, 1);
227 observable.first(IS_EVEN).subscribe(w);
228 verify(w, never()).onNext(anyInt());
229 verify(w, never()).onCompleted();
230 verify(w, times(1)).onError(isA(NoSuchElementException.class));
231 }
232
233 @Test
234 public void testReduce() {
235 Observable<Integer> observable = Observable.just(1, 2, 3, 4);
236 observable.reduce(new Func2<Integer, Integer, Integer>() {
237
238 @Override
239 public Integer call(Integer t1, Integer t2) {
240 return t1 + t2;
241 }
242
243 }).subscribe(w);
244
245 verify(w, times(1)).onNext(anyInt());
246 verify(w).onNext(10);
247 }
248
249
250
251
252 @Test(expected = NoSuchElementException.class)
253 public void testReduceWithEmptyObservable() {
254 Observable<Integer> observable = Observable.range(1, 0);
255 observable.reduce(new Func2<Integer, Integer, Integer>() {
256
257 @Override
258 public Integer call(Integer t1, Integer t2) {
259 return t1 + t2;
260 }
261
262 }).toBlocking().forEach(new Action1<Integer>() {
263
264 @Override
265 public void call(Integer t1) {
266
267 }
268 });
269
270 fail("Expected an exception to be thrown");
271 }
272
273
274
275
276
277
278 @Test
279 public void testReduceWithEmptyObservableAndSeed() {
280 Observable<Integer> observable = Observable.range(1, 0);
281 int value = observable.reduce(1, new Func2<Integer, Integer, Integer>() {
282
283 @Override
284 public Integer call(Integer t1, Integer t2) {
285 return t1 + t2;
286 }
287
288 }).toBlocking().last();
289
290 assertEquals(1, value);
291 }
292
293 @Test
294 public void testReduceWithInitialValue() {
295 Observable<Integer> observable = Observable.just(1, 2, 3, 4);
296 observable.reduce(50, new Func2<Integer, Integer, Integer>() {
297
298 @Override
299 public Integer call(Integer t1, Integer t2) {
300 return t1 + t2;
301 }
302
303 }).subscribe(w);
304
305 verify(w, times(1)).onNext(anyInt());
306 verify(w).onNext(60);
307 }
308
309 @Test
310 public void testOnSubscribeFails() {
311 @SuppressWarnings("unchecked")
312 Observer<String> observer = mock(Observer.class);
313 final RuntimeException re = new RuntimeException("bad impl");
314 Observable<String> o = Observable.create(new OnSubscribe<String>() {
315
316 @Override
317 public void call(Subscriber<? super String> t1) {
318 throw re;
319 }
320
321 });
322 o.subscribe(observer);
323 verify(observer, times(0)).onNext(anyString());
324 verify(observer, times(0)).onCompleted();
325 verify(observer, times(1)).onError(re);
326 }
327
328 @Test
329 public void testMaterializeDematerializeChaining() {
330 Observable<Integer> obs = Observable.just(1);
331 Observable<Integer> chained = obs.materialize().dematerialize();
332
333 @SuppressWarnings("unchecked")
334 Observer<Integer> observer = mock(Observer.class);
335 chained.subscribe(observer);
336
337 verify(observer, times(1)).onNext(1);
338 verify(observer, times(1)).onCompleted();
339 verify(observer, times(0)).onError(any(Throwable.class));
340 }
341
342
343
344
345
346
347
348
349 @Test
350 public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException {
351 final CountDownLatch latch = new CountDownLatch(1);
352 final AtomicInteger count = new AtomicInteger();
353 final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
354 Observable.create(new OnSubscribe<String>() {
355
356 @Override
357 public void call(final Subscriber<? super String> observer) {
358 final BooleanSubscription s = new BooleanSubscription();
359 new Thread(new Runnable() {
360
361 @Override
362 public void run() {
363 try {
364 if (!s.isUnsubscribed()) {
365 observer.onNext("1");
366 observer.onNext("2");
367 observer.onNext("three");
368 observer.onNext("4");
369 observer.onCompleted();
370 }
371 } finally {
372 latch.countDown();
373 }
374 }
375 }).start();
376 }
377 }).subscribe(new Subscriber<String>() {
378 @Override
379 public void onCompleted() {
380 System.out.println("completed");
381 }
382
383 @Override
384 public void onError(Throwable e) {
385 error.set(e);
386 System.out.println("error");
387 e.printStackTrace();
388 }
389
390 @Override
391 public void onNext(String v) {
392 int num = Integer.parseInt(v);
393 System.out.println(num);
394
395 count.incrementAndGet();
396 }
397
398 });
399
400
401 latch.await();
402
403 assertEquals(2, count.get());
404 assertNotNull(error.get());
405 if (!(error.get() instanceof NumberFormatException)) {
406 fail("It should be a NumberFormatException");
407 }
408 }
409
410
411
412
413
414
415 @Test
416 public void testCustomObservableWithErrorInObserverSynchronous() {
417 final AtomicInteger count = new AtomicInteger();
418 final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
419 Observable.create(new OnSubscribe<String>() {
420
421 @Override
422 public void call(Subscriber<? super String> observer) {
423 observer.onNext("1");
424 observer.onNext("2");
425 observer.onNext("three");
426 observer.onNext("4");
427 observer.onCompleted();
428 }
429 }).subscribe(new Subscriber<String>() {
430
431 @Override
432 public void onCompleted() {
433 System.out.println("completed");
434 }
435
436 @Override
437 public void onError(Throwable e) {
438 error.set(e);
439 System.out.println("error");
440 e.printStackTrace();
441 }
442
443 @Override
444 public void onNext(String v) {
445 int num = Integer.parseInt(v);
446 System.out.println(num);
447
448 count.incrementAndGet();
449 }
450
451 });
452 assertEquals(2, count.get());
453 assertNotNull(error.get());
454 if (!(error.get() instanceof NumberFormatException)) {
455 fail("It should be a NumberFormatException");
456 }
457 }
458
459
460
461
462
463
464
465 @Test
466 public void testCustomObservableWithErrorInObservableSynchronous() {
467 final AtomicInteger count = new AtomicInteger();
468 final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
469 Observable.create(new OnSubscribe<String>() {
470
471 @Override
472 public void call(Subscriber<? super String> observer) {
473 observer.onNext("1");
474 observer.onNext("2");
475 throw new NumberFormatException();
476 }
477 }).subscribe(new Subscriber<String>() {
478
479 @Override
480 public void onCompleted() {
481 System.out.println("completed");
482 }
483
484 @Override
485 public void onError(Throwable e) {
486 error.set(e);
487 System.out.println("error");
488 e.printStackTrace();
489 }
490
491 @Override
492 public void onNext(String v) {
493 System.out.println(v);
494 count.incrementAndGet();
495 }
496
497 });
498 assertEquals(2, count.get());
499 assertNotNull(error.get());
500 if (!(error.get() instanceof NumberFormatException)) {
501 fail("It should be a NumberFormatException");
502 }
503 }
504
505 @Test
506 public void testPublishLast() throws InterruptedException {
507 final AtomicInteger count = new AtomicInteger();
508 ConnectableObservable<String> connectable = Observable.create(new OnSubscribe<String>() {
509 @Override
510 public void call(final Subscriber<? super String> observer) {
511 count.incrementAndGet();
512 new Thread(new Runnable() {
513 @Override
514 public void run() {
515 observer.onNext("first");
516 observer.onNext("last");
517 observer.onCompleted();
518 }
519 }).start();
520 }
521 }).takeLast(1).publish();
522
523
524 final CountDownLatch latch = new CountDownLatch(1);
525 connectable.subscribe(new Action1<String>() {
526 @Override
527 public void call(String value) {
528 assertEquals("last", value);
529 latch.countDown();
530 }
531 });
532
533
534 connectable.subscribe(new Action1<String>() {
535 @Override
536 public void call(String ignored) {
537 }
538 });
539
540 Subscription subscription = connectable.connect();
541 assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
542 assertEquals(1, count.get());
543 subscription.unsubscribe();
544 }
545
546 @Test
547 public void testReplay() throws InterruptedException {
548 final AtomicInteger counter = new AtomicInteger();
549 ConnectableObservable<String> o = Observable.create(new OnSubscribe<String>() {
550
551 @Override
552 public void call(final Subscriber<? super String> observer) {
553 new Thread(new Runnable() {
554
555 @Override
556 public void run() {
557 counter.incrementAndGet();
558 observer.onNext("one");
559 observer.onCompleted();
560 }
561 }).start();
562 }
563 }).replay();
564
565
566 Subscription s = o.connect();
567 try {
568
569
570 final CountDownLatch latch = new CountDownLatch(2);
571
572
573 o.subscribe(new Action1<String>() {
574
575 @Override
576 public void call(String v) {
577 assertEquals("one", v);
578 latch.countDown();
579 }
580 });
581
582
583 o.subscribe(new Action1<String>() {
584
585 @Override
586 public void call(String v) {
587 assertEquals("one", v);
588 latch.countDown();
589 }
590 });
591
592 if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
593 fail("subscriptions did not receive values");
594 }
595 assertEquals(1, counter.get());
596 } finally {
597 s.unsubscribe();
598 }
599 }
600
601 @Test
602 public void testCache() throws InterruptedException {
603 final AtomicInteger counter = new AtomicInteger();
604 Observable<String> o = Observable.create(new OnSubscribe<String>() {
605
606 @Override
607 public void call(final Subscriber<? super String> observer) {
608 new Thread(new Runnable() {
609
610 @Override
611 public void run() {
612 counter.incrementAndGet();
613 observer.onNext("one");
614 observer.onCompleted();
615 }
616 }).start();
617 }
618 }).cache();
619
620
621 final CountDownLatch latch = new CountDownLatch(2);
622
623
624 o.subscribe(new Action1<String>() {
625
626 @Override
627 public void call(String v) {
628 assertEquals("one", v);
629 latch.countDown();
630 }
631 });
632
633
634 o.subscribe(new Action1<String>() {
635
636 @Override
637 public void call(String v) {
638 assertEquals("one", v);
639 latch.countDown();
640 }
641 });
642
643 if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
644 fail("subscriptions did not receive values");
645 }
646 assertEquals(1, counter.get());
647 }
648
649 @Test
650 public void testCacheWithCapacity() throws InterruptedException {
651 final AtomicInteger counter = new AtomicInteger();
652 Observable<String> o = Observable.create(new OnSubscribe<String>() {
653
654 @Override
655 public void call(final Subscriber<? super String> observer) {
656 new Thread(new Runnable() {
657
658 @Override
659 public void run() {
660 counter.incrementAndGet();
661 observer.onNext("one");
662 observer.onCompleted();
663 }
664 }).start();
665 }
666 }).cache(1);
667
668
669 final CountDownLatch latch = new CountDownLatch(2);
670
671
672 o.subscribe(new Action1<String>() {
673
674 @Override
675 public void call(String v) {
676 assertEquals("one", v);
677 latch.countDown();
678 }
679 });
680
681
682 o.subscribe(new Action1<String>() {
683
684 @Override
685 public void call(String v) {
686 assertEquals("one", v);
687 latch.countDown();
688 }
689 });
690
691 if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
692 fail("subscriptions did not receive values");
693 }
694 assertEquals(1, counter.get());
695 }
696
697
698
699
700
701
702
703
704
705
706 @Test
707 public void testErrorThrownWithoutErrorHandlerSynchronous() {
708 try {
709 Observable.error(new RuntimeException("failure")).subscribe(new Action1<Object>() {
710
711 @Override
712 public void call(Object t1) {
713
714 }
715
716 });
717 fail("expected exception");
718 } catch (Throwable e) {
719 assertEquals("failure", e.getMessage());
720 }
721 }
722
723
724
725
726
727
728
729
730
731
732
733
734 @Test
735 public void testErrorThrownWithoutErrorHandlerAsynchronous() throws InterruptedException {
736 final CountDownLatch latch = new CountDownLatch(1);
737 final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
738 Observable.create(new OnSubscribe<String>() {
739
740 @Override
741 public void call(final Subscriber<? super String> observer) {
742 new Thread(new Runnable() {
743
744 @Override
745 public void run() {
746 try {
747 observer.onError(new Error("failure"));
748 } catch (Throwable e) {
749
750 exception.set(e);
751 }
752 latch.countDown();
753 }
754 }).start();
755 }
756 }).subscribe(new Action1<String>() {
757
758 @Override
759 public void call(String t1) {
760
761 }
762
763 });
764
765 latch.await(3000, TimeUnit.MILLISECONDS);
766 assertNotNull(exception.get());
767 assertEquals("failure", exception.get().getMessage());
768 }
769
770 @Test
771 public void testTakeWithErrorInObserver() {
772 final AtomicInteger count = new AtomicInteger();
773 final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
774 Observable.just("1", "2", "three", "4").take(3).subscribe(new Subscriber<String>() {
775
776 @Override
777 public void onCompleted() {
778 System.out.println("completed");
779 }
780
781 @Override
782 public void onError(Throwable e) {
783 error.set(e);
784 System.out.println("error");
785 e.printStackTrace();
786 }
787
788 @Override
789 public void onNext(String v) {
790 int num = Integer.parseInt(v);
791 System.out.println(num);
792
793 count.incrementAndGet();
794 }
795
796 });
797 assertEquals(2, count.get());
798 assertNotNull(error.get());
799 if (!(error.get() instanceof NumberFormatException)) {
800 fail("It should be a NumberFormatException");
801 }
802 }
803
804 @Test
805 public void testOfType() {
806 Observable<String> observable = Observable.just(1, "abc", false, 2L).ofType(String.class);
807
808 @SuppressWarnings("unchecked")
809 Observer<Object> observer = mock(Observer.class);
810 observable.subscribe(observer);
811 verify(observer, never()).onNext(1);
812 verify(observer, times(1)).onNext("abc");
813 verify(observer, never()).onNext(false);
814 verify(observer, never()).onNext(2L);
815 verify(observer, never()).onError(
816 org.mockito.Matchers.any(Throwable.class));
817 verify(observer, times(1)).onCompleted();
818 }
819
820 @Test
821 public void testOfTypeWithPolymorphism() {
822 ArrayList<Integer> l1 = new ArrayList<Integer>();
823 l1.add(1);
824 LinkedList<Integer> l2 = new LinkedList<Integer>();
825 l2.add(2);
826
827 @SuppressWarnings("rawtypes")
828 Observable<List> observable = Observable.<Object> just(l1, l2, "123").ofType(List.class);
829
830 @SuppressWarnings("unchecked")
831 Observer<Object> observer = mock(Observer.class);
832 observable.subscribe(observer);
833 verify(observer, times(1)).onNext(l1);
834 verify(observer, times(1)).onNext(l2);
835 verify(observer, never()).onNext("123");
836 verify(observer, never()).onError(
837 org.mockito.Matchers.any(Throwable.class));
838 verify(observer, times(1)).onCompleted();
839 }
840
841 @Test
842 public void testContains() {
843 Observable<Boolean> observable = Observable.just("a", "b", null).contains("b");
844
845 @SuppressWarnings("unchecked")
846 Observer<Object> observer = mock(Observer.class);
847 observable.subscribe(observer);
848 verify(observer, times(1)).onNext(true);
849 verify(observer, never()).onNext(false);
850 verify(observer, never()).onError(
851 org.mockito.Matchers.any(Throwable.class));
852 verify(observer, times(1)).onCompleted();
853 }
854
855 @Test
856 public void testContainsWithInexistence() {
857 Observable<Boolean> observable = Observable.just("a", "b", null).contains("c");
858
859 @SuppressWarnings("unchecked")
860 Observer<Object> observer = mock(Observer.class);
861 observable.subscribe(observer);
862 verify(observer, times(1)).onNext(false);
863 verify(observer, never()).onNext(true);
864 verify(observer, never()).onError(
865 org.mockito.Matchers.any(Throwable.class));
866 verify(observer, times(1)).onCompleted();
867 }
868
869 @Test
870 public void testContainsWithNull() {
871 Observable<Boolean> observable = Observable.just("a", "b", null).contains(null);
872
873 @SuppressWarnings("unchecked")
874 Observer<Object> observer = mock(Observer.class);
875 observable.subscribe(observer);
876 verify(observer, times(1)).onNext(true);
877 verify(observer, never()).onNext(false);
878 verify(observer, never()).onError(
879 org.mockito.Matchers.any(Throwable.class));
880 verify(observer, times(1)).onCompleted();
881 }
882
883 @Test
884 public void testContainsWithEmptyObservable() {
885 Observable<Boolean> observable = Observable.<String> empty().contains("a");
886
887 @SuppressWarnings("unchecked")
888 Observer<Object> observer = mock(Observer.class);
889 observable.subscribe(observer);
890 verify(observer, times(1)).onNext(false);
891 verify(observer, never()).onNext(true);
892 verify(observer, never()).onError(
893 org.mockito.Matchers.any(Throwable.class));
894 verify(observer, times(1)).onCompleted();
895 }
896
897 @Test
898 public void testIgnoreElements() {
899 Observable<Integer> observable = Observable.just(1, 2, 3).ignoreElements();
900
901 @SuppressWarnings("unchecked")
902 Observer<Integer> observer = mock(Observer.class);
903 observable.subscribe(observer);
904 verify(observer, never()).onNext(any(Integer.class));
905 verify(observer, never()).onError(any(Throwable.class));
906 verify(observer, times(1)).onCompleted();
907 }
908
909 @Test
910 public void testJustWithScheduler() {
911 TestScheduler scheduler = new TestScheduler();
912 Observable<Integer> observable = Observable.from(Arrays.asList(1, 2)).subscribeOn(scheduler);
913
914 @SuppressWarnings("unchecked")
915 Observer<Integer> observer = mock(Observer.class);
916 observable.subscribe(observer);
917
918 scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
919
920 InOrder inOrder = inOrder(observer);
921 inOrder.verify(observer, times(1)).onNext(1);
922 inOrder.verify(observer, times(1)).onNext(2);
923 inOrder.verify(observer, times(1)).onCompleted();
924 inOrder.verifyNoMoreInteractions();
925 }
926
927 @Test
928 public void testStartWithWithScheduler() {
929 TestScheduler scheduler = new TestScheduler();
930 Observable<Integer> observable = Observable.just(3, 4).startWith(Arrays.asList(1, 2)).subscribeOn(scheduler);
931
932 @SuppressWarnings("unchecked")
933 Observer<Integer> observer = mock(Observer.class);
934 observable.subscribe(observer);
935
936 scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
937
938 InOrder inOrder = inOrder(observer);
939 inOrder.verify(observer, times(1)).onNext(1);
940 inOrder.verify(observer, times(1)).onNext(2);
941 inOrder.verify(observer, times(1)).onNext(3);
942 inOrder.verify(observer, times(1)).onNext(4);
943 inOrder.verify(observer, times(1)).onCompleted();
944 inOrder.verifyNoMoreInteractions();
945 }
946
947 @Test
948 public void testRangeWithScheduler() {
949 TestScheduler scheduler = new TestScheduler();
950 Observable<Integer> observable = Observable.range(3, 4, scheduler);
951
952 @SuppressWarnings("unchecked")
953 Observer<Integer> observer = mock(Observer.class);
954 observable.subscribe(observer);
955
956 scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
957
958 InOrder inOrder = inOrder(observer);
959 inOrder.verify(observer, times(1)).onNext(3);
960 inOrder.verify(observer, times(1)).onNext(4);
961 inOrder.verify(observer, times(1)).onNext(5);
962 inOrder.verify(observer, times(1)).onNext(6);
963 inOrder.verify(observer, times(1)).onCompleted();
964 inOrder.verifyNoMoreInteractions();
965 }
966
967 @Test
968 public void testCollectToList() {
969 Observable<List<Integer>> o = Observable.just(1, 2, 3).collect(new Func0<List<Integer>>() {
970
971 @Override
972 public List<Integer> call() {
973 return new ArrayList<Integer>();
974 }
975
976 }, new Action2<List<Integer>, Integer>() {
977
978 @Override
979 public void call(List<Integer> list, Integer v) {
980 list.add(v);
981 }
982 });
983
984 List<Integer> list = o.toBlocking().last();
985
986 assertEquals(3, list.size());
987 assertEquals(1, list.get(0).intValue());
988 assertEquals(2, list.get(1).intValue());
989 assertEquals(3, list.get(2).intValue());
990
991
992 List<Integer> list2 = o.toBlocking().last();
993
994 assertEquals(3, list2.size());
995 assertEquals(1, list2.get(0).intValue());
996 assertEquals(2, list2.get(1).intValue());
997 assertEquals(3, list2.get(2).intValue());
998 }
999
1000 @Test
1001 public void testCollectToString() {
1002 String value = Observable.just(1, 2, 3).collect(new Func0<StringBuilder>() {
1003
1004 @Override
1005 public StringBuilder call() {
1006 return new StringBuilder();
1007 }
1008
1009 }, new Action2<StringBuilder, Integer>() {
1010
1011 @Override
1012 public void call(StringBuilder sb, Integer v) {
1013 if (sb.length() > 0) {
1014 sb.append("-");
1015 }
1016 sb.append(v);
1017 }
1018 }).toBlocking().last().toString();
1019
1020 assertEquals("1-2-3", value);
1021 }
1022
1023 @Test
1024 public void testMergeWith() {
1025 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1026 Observable.just(1).mergeWith(Observable.just(2)).subscribe(ts);
1027 ts.assertReceivedOnNext(Arrays.asList(1, 2));
1028 }
1029
1030 @Test
1031 public void testConcatWith() {
1032 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1033 Observable.just(1).concatWith(Observable.just(2)).subscribe(ts);
1034 ts.assertReceivedOnNext(Arrays.asList(1, 2));
1035 }
1036
1037 @Test
1038 public void testAmbWith() {
1039 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1040 Observable.just(1).ambWith(Observable.just(2)).subscribe(ts);
1041 ts.assertReceivedOnNext(Arrays.asList(1));
1042 }
1043
1044 @Test(expected = OnErrorNotImplementedException.class)
1045 public void testSubscribeWithoutOnError() {
1046 Observable<String> o = Observable.just("a", "b").flatMap(new Func1<String, Observable<String>>() {
1047 @Override
1048 public Observable<String> call(String s) {
1049 return Observable.error(new Exception("test"));
1050 }
1051 });
1052 o.subscribe();
1053 }
1054
1055 @Test
1056 public void testTakeWhileToList() {
1057 final int expectedCount = 3;
1058 final AtomicInteger count = new AtomicInteger();
1059 for (int i = 0;i < expectedCount; i++) {
1060 Observable
1061 .just(Boolean.TRUE, Boolean.FALSE)
1062 .takeWhile(new Func1<Boolean, Boolean>() {
1063 @Override
1064 public Boolean call(Boolean value) {
1065 return value;
1066 }
1067 })
1068 .toList()
1069 .doOnNext(new Action1<List<Boolean>>() {
1070 @Override
1071 public void call(List<Boolean> booleans) {
1072 count.incrementAndGet();
1073 }
1074 })
1075 .subscribe();
1076 }
1077 assertEquals(expectedCount, count.get());
1078 }
1079
1080 @Test
1081 public void testCompose() {
1082 TestSubscriber<String> ts = new TestSubscriber<String>();
1083 Observable.just(1, 2, 3).compose(new Transformer<Integer, String>() {
1084
1085 @Override
1086 public Observable<String> call(Observable<Integer> t1) {
1087 return t1.map(new Func1<Integer, String>() {
1088
1089 @Override
1090 public String call(Integer t1) {
1091 return String.valueOf(t1);
1092 }
1093
1094 });
1095 }
1096
1097 }).subscribe(ts);
1098 ts.assertTerminalEvent();
1099 ts.assertNoErrors();
1100 ts.assertReceivedOnNext(Arrays.asList("1", "2", "3"));
1101 }
1102
1103 @Test
1104 public void testErrorThrownIssue1685() {
1105 Subject<Object, Object> subject = ReplaySubject.create();
1106
1107 Observable.error(new RuntimeException("oops"))
1108 .materialize()
1109 .delay(1, TimeUnit.SECONDS)
1110 .dematerialize()
1111 .subscribe(subject);
1112
1113 subject.subscribe();
1114 subject.materialize().toBlocking().first();
1115
1116 System.out.println("Done");
1117 }
1118
1119 @Test
1120 public void testEmptyIdentity() {
1121 assertEquals(Observable.empty(), Observable.empty());
1122 }
1123
1124 @Test
1125 public void testEmptyIsEmpty() {
1126 Observable.<Integer>empty().subscribe(w);
1127
1128 verify(w).onCompleted();
1129 verify(w, never()).onNext(any(Integer.class));
1130 verify(w, never()).onError(any(Throwable.class));
1131 }
1132
1133 @Test
1134 public void testSubscribingSubscriberAsObserverMaintainsSubscriptionChain() {
1135 TestSubscriber<Object> subscriber = new TestSubscriber<Object>();
1136 Subscription subscription = Observable.just("event").subscribe((Observer<Object>) subscriber);
1137 subscription.unsubscribe();
1138
1139 subscriber.assertUnsubscribed();
1140 }
1141 }